"""Misc utility functions."""

import argparse
from collections import abc
import contextlib
from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import faulthandler
import fcntl
import logging
import os
import re
import signal
import subprocess
import tempfile
import typing
from urllib.parse import urlsplit
from warnings import warn

from dateutil.parser import parse

from .logger import get_logger
from .retrying import retrying_on_exception

T = typing.TypeVar('T')

LOGGER = get_logger(__name__)

_TIMEDELTA_REGEX = re.compile(r'((?P<weeks>[\.\d]+?) *(?:w|weeks?))? *'
                              r'((?P<days>[\.\d]+?) *(?:d|days?))? *'
                              r'((?P<hours>[\.\d]+?) *(?:h|hours?))? *'
                              r'((?P<minutes>[\.\d]+?) *(?:m|mins?|minutes?))? *'
                              r'((?P<seconds>[\.\d]+?) *(?:s?|secs?|seconds?))?')


class EnvVarNotSetError(Exception):
    """Requested environment variable is not set."""


@contextlib.contextmanager
def only_log_exceptions(exceptions=Exception):
    """Log, but ignore exceptions."""
    try:
        yield
    # pylint: disable=broad-except
    except exceptions:
        LOGGER.exception('Ignored by context manager')


def flattened(lst: T | list[T] | list[typing.Any]) -> abc.Iterator[T]:
    """Return an iterator for the items of the flattened list."""
    if isinstance(lst, list):
        for i in lst:
            yield from flattened(i)
    else:
        yield lst


def get_nested_key(
    data: dict[str, typing.Any] | list[typing.Any],
    key: str,
    default: typing.Any = None,
    *,
    lookup_attrs: bool = False,
    delimiter: str = '/',
) -> typing.Any:
    """Dig through nested dictionaries/lists to get the value of a key.

    Inputs a key slash-separated like key_a/key_b/key_c, and returns 'value' on
    the following chain: {'key_a': {'key_b': {'key_c': value}}} handling
    missing keys.

    The default delimiter is / to play well with dot-prefix "internal" keys used
    in quite some places.

    Setting lookup_attrs=True will also retrieve object attributes.
    """
    subkeys = key.split(delimiter)
    for subkey in subkeys:
        try:
            data = data[int(subkey)] if isinstance(data, list) else data[subkey]
        except (KeyError, TypeError, IndexError, ValueError):
            if not lookup_attrs:
                return default
            try:
                data = getattr(data, subkey)
            except AttributeError:
                return default

    return data


def set_nested_key(
    data: dict[str, typing.Any] | list[typing.Any],
    key: str,
    value: typing.Any,
) -> None:
    """Set key on nested dictionaries/lists.

    Handle setting subkeys using / as delimiter. If sub dictionary does not
    exists, it will be created.
    """
    subkeys = key.split('/')
    for i, subkey in enumerate(subkeys):
        # On the last subkey, set the value.
        if i == len(subkeys) - 1:
            if isinstance(data, list):
                data[int(subkey)] = value
            else:
                data[subkey] = value
            break

        # If not the last one, try to get or create subkey value.
        if isinstance(data, list):
            data = data[int(subkey)]
        else:
            if subkey not in data:
                data[subkey] = {}
            data = data[subkey]


def append_to_nested_list(data, key, element):
    """
    Append an element to a list in the dictionary.

    Handle setting subkeys using / as delimiter. If sub dictionary does not
    exists, it will be created.

    If the final list does not exist, it will be created.
    """
    value = get_nested_key(data, key, [])
    if not isinstance(value, list):
        raise Exception('Element is not a list')
    value += [element]
    set_nested_key(data, key, value)


def get_env_var_or_raise(env_key):
    """Retrieve the value of an environment variable or raise exception.

    Args:
        env_key:      Name of the variable's value to retrieve.

    Returns:
        Value of the environment variable, if set.

    Raises:
        EnvVarNotSetError if the variable is not set.

    """
    if (env_var := os.getenv(env_key)) is None:
        raise EnvVarNotSetError(f'Environment variable {env_key} is not set!')
    return env_var


def booltostr(value: bool) -> str:
    """Convert a boolean to True/False strings."""
    return str(bool(value)).lower()


def strtobool(value: str) -> bool:
    """Convert True/False strings to bool."""
    if value in {'true', 'True'}:
        return True
    if value in {'false', 'False'}:
        return False
    raise ValueError(f'invalid truth value {value}')


def parse_timedelta(value: str) -> timedelta:
    """Convert time deltas like "3m 5s" to datetime.timedelta."""
    if not (parts := _TIMEDELTA_REGEX.fullmatch(value)):
        raise Exception(f"Unable to parse time delta '{value}'")
    time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
    if not time_params:
        raise Exception("Unable to parse empty time delta")
    return timedelta(**time_params)


def get_env_bool(name: str, default: bool = False) -> bool:
    """Convert the value of an environment variable to bool."""
    return strtobool(os.environ.get(name, str(default)))


def get_env_int(name: str, default: int) -> int:
    """Convert the value of an environment variable to int."""
    return int(os.environ.get(name, str(default)))


def is_production() -> bool:
    """Check whether CKI_DEPLOYMENT_ENVIRONMENT.startswith(production)."""
    return deployment_environment().startswith('production')


def is_staging() -> bool:
    """Check whether CKI_DEPLOYMENT_ENVIRONMENT.startswith(staging)."""
    return deployment_environment().startswith('staging')


def is_production_or_staging() -> bool:
    """Check whether CKI_DEPLOYMENT_ENVIRONMENT.startswith(production|staging)."""
    return is_production() or is_staging()


def deployment_environment() -> str:
    """Return the deployment environment, defaults to development.

    CKI code understands the following environments:
    - production: deployed into the CKI production environment
    - staging: deployed into the production-like CKI staging environment, which
      is nearly identical to production, with the following differences:
      - production-affecting code paths are disabled, e.g. email sending
    - development: anything else is considered a development environment:
      - production-affecting code paths are disabled
      - debugging code paths can be enabled, e.g. Django debug toolbar, Python
        debug webserver, ...
    """
    return os.getenv('CKI_DEPLOYMENT_ENVIRONMENT', 'development')


@contextmanager
def tempfile_from_string(data):
    """Create a tempfile that is deleted on contextmanager exit.

    Arguments:
        data: str, a string that the tempfile will contain

    """
    with tempfile.NamedTemporaryFile(delete=False) as temp:
        temp.write(data)
        temp.close()
        try:
            yield temp.name
        finally:
            os.unlink(temp.name)


@contextmanager
def enter_dir(directory):
    """Change directory using os.chdir(directory), return on context exit.

    Arguments:
        directory: str, a directory to enter

    """
    current = os.getcwd()
    try:
        os.chdir(directory)
        yield
    finally:
        os.chdir(current)


def safe_popen(*args, stdin_data=None, **kwargs):
    """Open a process with specified arguments, keyword arguments, stdin data.

    This function blocks until process finishes. Uses utf-8 dst_file decode
    stdout/stderr, if there's any output on them.

    Arguments:
        args:       arguments dst_file pass dst_file Popen
        stdin_data: None or str, use None when you don't want dst_file pass
                    string data dst_file stdin
        kwargs:     keyword arguments dst_file pass dst_file Popen
    Returns:
        tuple (stdout, stderr, returncode) where
            stdout is a string
            stderr is a string
            returncode is an integer

    """
    with subprocess.Popen(*args, **kwargs) as subproc:
        stdout, stderr = subproc.communicate(stdin_data)
        stdout = stdout.decode('utf-8') if stdout else ''
        stderr = stderr.decode('utf-8') if stderr else ''
        return stdout, stderr, subproc.returncode


def read_stream(stream):
    """Read lines from stream like stdout."""
    fhandle = stream.fileno()
    flags = fcntl.fcntl(fhandle, fcntl.F_GETFL)
    fcntl.fcntl(fhandle, fcntl.F_SETFL, flags | os.O_NONBLOCK)
    try:
        data = stream.read()
        if data:
            return data.decode('utf-8')
    except OSError:
        pass
    return ""


@retrying_on_exception(RuntimeError)
def retry_safe_popen(err_exc_strings, *args, stdin_data=None, **kwargs):
    """Call safe_popen with *args, stdin_data=None, **kwargs provided.

    If stderr stream is present and contains any string in err_exc_strings
    list, then the process call is done again with retry after 3 seconds
    (see retrying_on_exception decorator). Log commands retry and allow 3
    retries max. Also log if last command failed and we gave up. The
    program execution is not terminated / no exception is raised on last
    failure.

    Args:
        err_exc_strings: a list of strings; if any is present in stderr,
                         retry the command
        args:            arguments to pass to Popen
        stdin_data:      None or str, use None when you don't want to pass
                         string data to stdin
        kwargs:          keyword arguments to pass to Popen
    Returns:
        tuple (stdout, stderr, returncode) where
            stdout is a string
            stderr is a string
            returncode is an integer
    """
    stdout, stderr, returncode = safe_popen(*args, stdin_data=stdin_data,
                                            **kwargs)

    if err_exc_strings and stderr:
        # we clearly want to catch issues; let's debug what stderr was
        logging.warning(stderr.strip())

    for err_str in err_exc_strings:
        if stderr and err_str in stderr:
            logging.warning('caught "%s" error string in stderr', err_str)
            raise RuntimeError

    return stdout, stderr, returncode


class StoreNameValuePair(argparse.Action):
    # pylint: disable=too-few-public-methods
    """Parse key=value arguments."""

    def __call__(self, parser, namespace, values, option_string=None):
        """Parse the command line argument."""
        variables = getattr(namespace, self.dest) or {}
        if isinstance(values, str):
            values = [values]
        for key_value_pair in values:
            key, value = key_value_pair.split('=', 1)
            variables[key] = value
        setattr(namespace, self.dest, variables)


def sentry_init(sentry_sdk, **kwargs):
    """
    Initialize Sentry if SENTRY_DSN env var is defined.

    Optionally, kwarg parameters are added to the sentry_sdk.init call.

    Additionally the faulthandler module is connected to SIGUSR1
    to allow to obtain backtraces from running processes.
    """
    if os.getenv('SENTRY_DSN'):
        params = {
            'ca_certs': os.getenv('REQUESTS_CA_BUNDLE'),
            'environment': deployment_environment(),
        }
        params.update(kwargs)
        if ci_job_url := os.getenv('CI_JOB_URL'):
            params['server_name'] = ci_job_url

        sentry_sdk.init(**params)

    faulthandler.register(signal.SIGUSR1)


def sentry_flush():
    """
    Send all events pending in sentry.

    If sentry_sdk is available and it's initialized, flush it.

    https://docs.sentry.io/platforms/python/configuration/draining/
    """
    try:
        from sentry_sdk import Hub  # pylint: disable=import-outside-toplevel
    except ImportError:
        return

    client = Hub.current.client
    if client is not None:
        client.flush()


def now_tz_utc() -> datetime:
    """Return tz-aware current datetime in UTC."""
    return datetime.now(timezone.utc)


def datetime_fromisoformat_tz_utc(date_string: str) -> datetime:
    """Return tz-aware datetime in UTC from parsed string."""
    default = now_tz_utc().replace(hour=0, minute=0, second=0, microsecond=0)
    result = parse(date_string, default=default)
    return result.astimezone(timezone.utc)


def ensure_tz_utc(dt: datetime) -> datetime:
    """Return tz-aware datetime in UTC from any datetime."""
    return dt.replace(tzinfo=timezone.utc) if dt.tzinfo is None else dt.astimezone(timezone.utc)


def utc_now_iso():
    """Return UTC time in ISO 8601 format used by KCIDB."""
    return datetime.now(timezone.utc).isoformat()


def key_value_list_to_dict(kv_list):
    """Turn a list of {key: key, value: value} elements into a dictionary."""
    return {v['key']: v['value'] for v in kv_list or []}


def _is_production_cli() -> int:
    """Exit with 0 if CKI_DEPLOYMENT_ENVIRONMENT starts with production, else with 1."""
    return 0 if is_production() else 1


def _is_staging_cli() -> int:
    """Exit with 0 if CKI_DEPLOYMENT_ENVIRONMENT starts with staging, else with 1."""
    return 0 if is_staging() else 1


def _deployment_environment_cli() -> None:
    """Print the deployment environment."""
    print(deployment_environment())


def sanitize_kcidb_subtest_status(result: str) -> str:
    """Sanitize subtest status.

    Same as `cki_lib.kcidb.validate.sanitize_kcidb_status`
    """
    # NOTE: Import inline to avoid requiring `jsonschema`, but also cyclic import.
    # pylint: disable=import-outside-toplevel,cyclic-import;
    from cki_lib.kcidb.validate import sanitize_kcidb_status

    warn(
        ("Function `cki_lib.misc.sanitize_kcidb_subtest_status` will be deprecated soon,"
         " please use `cki_lib.kcidb.validate.sanitize_kcidb_status`."),
        DeprecationWarning, stacklevel=2)

    return sanitize_kcidb_status(result)


def beaker_fetch_url_to_web_url(url: str) -> str:
    """Convert a Beaker fetch URL to a browsable web URL."""
    # pylint: disable=too-many-return-statements
    parts = urlsplit(url)
    if url.startswith('git://repo.usersys.redhat.com/kernel-tests-public?'):
        fragment = parts.fragment.strip('/')
        return ('https://gitlab.com/redhat/centos-stream/tests/kernel/kernel-tests/'
                f'-/tree/production/{fragment}')
    if url.startswith('git://repo.usersys.redhat.com/kernel-tests?'):
        fragment = parts.fragment.strip('/')
        return ('https://gitlab.cee.redhat.com/kernel-qe/kernel/'
                f'-/tree/master/{fragment}')
    if url.startswith('https://s3.amazonaws.com/arr-cki-prod-lookaside/lookaside/'
                      'kernel-tests-public/kernel-tests-production.'):
        fragment = parts.fragment.strip('/')
        return ('https://gitlab.com/redhat/centos-stream/tests/kernel/kernel-tests/'
                f'-/tree/production/{fragment}')
    if 'gitlab' in parts.netloc:
        fragment = parts.fragment.strip('/')
        if match := re.fullmatch(r'(.+)/-/archive/([^/]+)/[^/]+',
                                 parts.path):
            return f'{parts.scheme}://{parts.netloc}{match[1]}/-/tree/{match[2]}/{fragment}'
    if parts.netloc == 'github.com':
        fragment = parts.fragment.strip('/')
        if match := re.fullmatch(r'(.+)/archive/(refs/heads/)?([^/]+).(zip|tar\.gz)', parts.path):
            return f'{parts.scheme}://{parts.netloc}{match[1]}/tree/{match[3]}/{fragment}'
    if parts.scheme == 'git' and 'gerrit' in parts.netloc:
        path = parts.path.strip('/')
        fragment = parts.fragment.strip('/')
        query = f'?h={parts.query}' if parts.query else ''
        return f'https://{parts.netloc}/git/{path}/tree/{fragment}{query}'

    return url


def to_kebab_case(string: str) -> str:
    """Convert a string to kebab-case."""
    string = re.sub(r'([A-Z])([A-Z]*)', lambda m: m[1] + m[2].lower(), string)
    string = re.sub(r'[A-Z]', lambda m: '-' + m[0].lower(), string)
    string = re.sub(r'^[-._\s/]*|[-._\s/]+', '-', string)
    string = re.sub(r'^-|-$', r'', string)
    return string


def to_pascal_case(string: str) -> str:
    """Convert a string to PascalCase."""
    string = to_kebab_case(string)
    string = re.sub(r'(?:^|-)(.)', lambda m: m.group(1).upper(), string)
    return string


def to_snake_case(string: str) -> str:
    """Convert a string to snake_case."""
    string = to_kebab_case(string)
    string = re.sub(r'-', r'_', string)
    return string


def to_screaming_snake_case(string: str) -> str:
    """Convert a string to SCREAMING_SNAKE_CASE."""
    string = to_kebab_case(string)
    string = re.sub(r'-', r'_', string)
    string = string.upper()
    return string
